In [ ]:
from IPython.display import Image, SVG
id: Unique identifier of the customer
name: Name of the customer
transactions: List of transaction-id, amount pairs, one for each transaction for the customer in that file
In [ ]:
from accounts import create_accounts_json
num_files = 25
n = 100000 # number of accounts per file
k = 500 # number of transactions
create_accounts_json(num_files, n, k)
In [ ]:
from nfs import create_denormalized
create_denormalize()
In [ ]:
from random_array import random_array
random_array()
Dask is a flexible parallel computing library for analytics. Dask emphasizes the following virtues:
dask.distributed
In [ ]:
Image("http://dask.pydata.org/en/latest/_images/collections-schedulers.png")
In [ ]:
SVG("http://dask.pydata.org/en/latest/_images/dask-array-black-text.svg")
+, *, exp, log, ...
sum(), mean(), std(), sum(axis=0), ...
tensordot
transpose
x[:100, 500:100:-2]
x[:, [10, 1, 5]]
__array__
svd, qr, solve, solve_triangular, lstsq
In [ ]:
import dask.array as da
chunk
is important and has performance implications
In [ ]:
x = da.arange(25, chunks=5)
In [ ]:
y = x ** 2
In [ ]:
y
In [ ]:
y.visualize()
In [ ]:
y.dask.keys()
compute
In [ ]:
y.compute()
__array__
protocol
In [ ]:
np.array(y)
compute
dask.get
is an alias for the synchronous backend. Useful for debugging.
In [ ]:
y.compute(get=dask.get)
dask.threaded.get
is the default
In [ ]:
y.compute(get=dask.threaded.get)
In [ ]:
from multiprocessing import cpu_count
cpu_count()
fork
fork
creates a new child process which is a copy(-on-write) of the parent process
In [ ]:
y.compute(get=dask.multiprocessing.get)
dask.distributed
library
In [ ]:
import h5py
import os
f = h5py.File(os.path.join('..', 'data', 'random.hdf5'))
dset = f['/x']
In [ ]:
sums = []
for i in range(0, 1000000000, 1000000):
chunk = dset[i: i + 1000000]
sums.append(chunk.sum())
total = np.sum(sums)
print(total / 1e9)
In [ ]:
x = da.from_array(dset, chunks=(1000000, ))
x
In [ ]:
result = x.mean()
In [ ]:
result
In [ ]:
result.compute()
In [ ]:
x[:10].compute()
Use dask.array.random.normal
to create a 20,000 x 20,000 array $X ~ \sim N(10, .1)$ with chunks
set to (1000, 1000)
Take the mean of every 100 elements along axis 0.
Hint: Recall you can slice with the following syntax [start:end:step]
In [ ]:
# [Solution here]
In [ ]:
%load solutions/dask_array.py
Your performance may vary. If you attempt the NumPy version then please ensure that you have more than 4GB of main memory.
In [ ]:
import numpy as np
In [ ]:
%%time
x = np.random.normal(10, 0.1, size=(20000, 20000))
y = x.mean(axis=0)[::100]
y
Faster and needs only MB of memory
In [ ]:
%%time
x = da.random.normal(10, 0.1, size=(20000, 20000), chunks=(1000, 1000))
y = x.mean(axis=0)[::100]
y.compute()
da.linalg.qr
da.linalg.cholesky
da.linalg.svd
map
, filter
, fold
, frequencies
and groupby
In [ ]:
import os
import dask.bag as db
In [ ]:
bag = db.read_text(os.path.join('..', 'data', 'accounts.*.json.gz'))
In [ ]:
bag.take(3)
In [ ]:
import json
In [ ]:
js = bag.map(json.loads)
In [ ]:
js.take(3)
In [ ]:
counts = js.pluck('name').frequencies()
In [ ]:
counts.compute()
filter
and take
all of the transactions for the first five users named "Alice"count_transactions
that takes a dictionary from accounts
and returns a dictionary that holds the name
and a key count
that is the number of transactions for that user id.filter
to get the accounts where the user is named Alice and map
the function you just created to get the number of transactions for each user named Alice. pluck
the count and display the first 5.
In [ ]:
%load solutions/bag_alice.py
In [ ]:
b = db.from_sequence(['Alice', 'Bob', 'Charlie', 'Dan', 'Edith', 'Frank'])
b.groupby(len).compute()
In [ ]:
b = db.from_sequence(list(range(10)))
b.groupby(lambda x: x % 2).compute()
Group by eevens and odds and take the largest value
In [ ]:
b.groupby(lambda x: x % 2).map(lambda k, v: (k, max(v))).compute()
combineByKey
method on RDD
When using foldby you provide
Your reduction must be associative. It will happen in parallel in each of the partitions of your dataset. Then all of these intermediate results will be combined by the combine binary operator.
This is just what we saw in sum
above
functools.reduce
works like so
In [ ]:
import functools
In [ ]:
values = range(10)
In [ ]:
def func(acc, y):
print(acc)
print(y)
print()
return acc + y
In [ ]:
functools.reduce(func, values)
In [ ]:
b.foldby(lambda x: x % 2, binop=max, combine=max).compute()
Using the accounts data above, find the number of people with the same name
In [ ]:
js.take(1)
In [ ]:
from dask.diagnostics import ProgressBar
In [ ]:
counts = js.foldby(key='name',
binop=lambda total, x: total + 1,
initial=0,
combine=lambda a, b: a + b,
combine_initial=0)
In [ ]:
with ProgressBar():
result = counts.compute()
In [ ]:
result
In [ ]:
%load solutions/bag_foldby.py
<img src="http://dask.pydata.org/en/latest/_images/dask-dataframe.svg", width="30%">
Trivially parallelizable operations (fast):
df.x + df.y, df * df
df[df.x > 0]
df.loc[4.0:10.5]
df.x.max(), df.max()
df[df.x.isin([1, 2, 3])]
df.timestamp.month
Cleverly parallelizable operations (fast):
df.groupby(df.x).y.max(), df.groupby('x').max()
df.x.value_counts()
df.x.drop_duplicates()
dd.merge(df1, df2, left_index=True, right_index=True)
dd.merge(df1, df2, on='id')
df1.x + df2.y
df.resample(...)
df.rolling(...)
df[['col1', 'col2']].corr()
Operations requiring a shuffle (slow-ish, unless on index)
df.set_index(df.x)
df.groupby(df.x).apply(myfunc)
dd.merge(df1, df2, on='name')
In [ ]:
import dask.dataframe as dd
In [ ]:
df = dd.read_csv("../data/NationalFoodSurvey/NFS*.csv")
DataFrame.head
is one operation that is not lazy
In [ ]:
df.head(5)
In [ ]:
df.npartitions
In [ ]:
df.known_divisions
styr
to make some operations more performantSo
[1974, 1975, 1976]
Would be 2 partitions. The first contains 1974. The second contains 1975 and 1976. To get three partitions, one for the final observation, duplicate it.
[1974, 1975, 1976, 1976]
In [ ]:
partitions = list(range(1974, 2001)) + [2000]
df = df.set_partition('styr', divisions=partitions)
In [ ]:
df.known_divisions
In [ ]:
df.divisions
In [ ]:
df.info()
In addition to the (supported) pandas DataFrame API, dask provides a few more convenient methods
DataFrame.categorize
DataFrame.map_partions
DataFrame.get_division
DataFrame.repartition
DataFrame.set_partition
DataFrame.to_{bag|castra}
DataFrame.visualize
A few methods have a slightly different API
DataFrame.apply
GroupBy.apply
In [ ]:
df2000 = df.get_division(26)
In [ ]:
type(df2000)
What food group was consumed the most times in 2000?
In [ ]:
df2000.set_index('minfd')
In [ ]:
grp = df2000.groupby('minfd')
In [ ]:
size = grp.apply(len, columns='size')
In [ ]:
size.head()
In [ ]:
minfd = size.compute().idxmax()
In [ ]:
print(minfd)
In [ ]:
food_mapping = pd.read_csv("../data/NationalFoodSurvey/food_mapping.csv")
isin
method
In [ ]:
food_mapping.ix[food_mapping.minfd.isin([minfd])]
In [ ]:
# [Solution here]
In [ ]:
%load solutions/nfs_most_purchased.py
In [ ]:
def most_frequent_food(partition):
# partition is a pandas.DataFrame
grpr = partition.groupby('minfd')
size = grpr.size()
minfd = size.idxmax()
idx = food_mapping.minfd.isin([minfd])
description = food_mapping.ix[idx].minfddesc.iloc[0]
year = int(partition.styr.iloc[0])
return year, description
In [ ]:
mnfd_year = df.map_partitions(most_frequent_food)
In [ ]:
mnfd_year.compute()
In [ ]:
zip(mnfd_year.compute(),)
minfd
and calculate daily per capita consumption of each food group. Hint, you want to use map_partitions
.
In [ ]:
%load solutions/average_consumption.py
In [ ]:
Image('images/bcolz_bench.png')